#! /usr/bin/env python3

import os
import subprocess
import sys
import platform
import argparse
import re
import threading
import multiprocessing
from multiprocessing.pool import ThreadPool
from glob import glob

if os.name == 'nt':
    MICROPYTHON = os.getenv('MICROPY_MICROPYTHON', 'micropython/ports/windows/micropython.exe')
else:
    MICROPYTHON = os.getenv('MICROPY_MICROPYTHON', 'micropython/ports/unix/micropython')

# mpy-cross is only needed if --via-mpy command-line arg is passed
MPYCROSS = os.getenv('MICROPY_MPYCROSS', '../mpy-cross/mpy-cross')

# Set PYTHONIOENCODING so that CPython will use utf-8 on systems which set another encoding in the locale
os.environ['PYTHONIOENCODING'] = 'utf-8'

def rm_f(fname):
    if os.path.exists(fname):
        os.remove(fname)


# unescape wanted regex chars and escape unwanted ones
def convert_regex_escapes(line):
    cs = []
    escape = False
    for c in str(line, 'utf8'):
        if escape:
            escape = False
            cs.append(c)
        elif c == '\\':
            escape = True
        elif c in ('(', ')', '[', ']', '{', '}', '.', '*', '+', '^', '$'):
            cs.append('\\' + c)
        else:
            cs.append(c)
    # accept carriage-return(s) before final newline
    if cs[-1] == '\n':
        cs[-1] = '\r*\n'
    return bytes(''.join(cs), 'utf8')


def run_micropython(pyb, args, test_file, is_special=False):
    special_tests = (
        'micropython/meminfo.py', 'basics/bytes_compare3.py',
        'basics/builtin_help.py', 'thread/thread_exc2.py',
    )
    had_crash = False
    if pyb is None:
        # run on PC
        if test_file.startswith(('cmdline/', 'feature_check/')) or test_file in special_tests:
            # special handling for tests of the unix cmdline program
            is_special = True

        if is_special:
            # check for any cmdline options needed for this test
            args = [MICROPYTHON]
            with open(test_file, 'rb') as f:
                line = f.readline()
                if line.startswith(b'# cmdline:'):
                    # subprocess.check_output on Windows only accepts strings, not bytes
                    args += [str(c, 'utf-8') for c in line[10:].strip().split()]

            # run the test, possibly with redirected input
            try:
                if 'repl_' in test_file:
                    # Need to use a PTY to test command line editing
                    try:
                        import pty
                    except ImportError:
                        # in case pty module is not available, like on Windows
                        return b'SKIP\n'
                    import select

                    def get(required=False):
                        rv = b''
                        while True:
                            ready = select.select([emulator], [], [], 0.02)
                            if ready[0] == [emulator]:
                                rv += os.read(emulator, 1024)
                            else:
                                if not required or rv:
                                    return rv

                    def send_get(what):
                        os.write(emulator, what)
                        return get()

                    with open(test_file, 'rb') as f:
                        # instead of: output_mupy = subprocess.check_output(args, stdin=f)
                        # openpty returns two read/write file descriptors.  The first one is
                        # used by the program which provides the virtual
                        # terminal service, and the second one is used by the
                        # subprogram which requires a tty to work.
                        emulator, subterminal = pty.openpty()
                        p = subprocess.Popen(args, stdin=subterminal, stdout=subterminal,
                                             stderr=subprocess.STDOUT, bufsize=0)
                        banner = get(True)
                        output_mupy = banner + b''.join(send_get(line) for line in f)
                        send_get(b'\x04') # exit the REPL, so coverage info is saved
                        p.kill()
                        os.close(emulator)
                        os.close(subterminal)
                else:
                    output_mupy = subprocess.check_output(args + [test_file], stderr=subprocess.STDOUT)
            except subprocess.CalledProcessError:
                return b'CRASH'

        else:
            # a standard test run on PC

            # create system command
            cmdlist = [MICROPYTHON, '-X', 'emit=' + args.emit]
            if args.heapsize is not None:
                cmdlist.extend(['-X', 'heapsize=' + args.heapsize])

            # if running via .mpy, first compile the .py file
            if args.via_mpy:
                subprocess.check_output([MPYCROSS, '-mcache-lookup-bc', '-o', 'mpytest.mpy', test_file])
                cmdlist.extend(['-m', 'mpytest'])
            else:
                cmdlist.append(test_file)

            # run the actual test
            e = {"MICROPYPATH": os.getcwd() + ":", "LANG": "en_US.UTF-8"}
            p = subprocess.Popen(cmdlist, env=e, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
            output_mupy = b''
            while p.poll() is None:
                output_mupy += p.stdout.read()
            output_mupy += p.stdout.read()
            if p.returncode != 0:
                output_mupy = b'CRASH'

            # clean up if we had an intermediate .mpy file
            if args.via_mpy:
                rm_f('mpytest.mpy')

    else:
        # run on pyboard
        import pyboard
        pyb.enter_raw_repl()
        try:
            output_mupy = pyb.execfile(test_file)
        except pyboard.PyboardError:
            had_crash = True
            output_mupy = b'CRASH'

    # canonical form for all ports/platforms is to use \n for end-of-line
    output_mupy = output_mupy.replace(b'\r\n', b'\n')

    # don't try to convert the output if we should skip this test
    if had_crash or output_mupy in (b'SKIP\n', b'CRASH'):
        return output_mupy

    if is_special or test_file in special_tests:
        # convert parts of the output that are not stable across runs
        with open(test_file + '.exp', 'rb') as f:
            lines_exp = []
            for line in f.readlines():
                if line == b'########\n':
                    line = (line,)
                else:
                    line = (line, re.compile(convert_regex_escapes(line)))
                lines_exp.append(line)
        lines_mupy = [line + b'\n' for line in output_mupy.split(b'\n')]
        if output_mupy.endswith(b'\n'):
            lines_mupy = lines_mupy[:-1] # remove erroneous last empty line
        i_mupy = 0
        for i in range(len(lines_exp)):
            if lines_exp[i][0] == b'########\n':
                # 8x #'s means match 0 or more whole lines
                line_exp = lines_exp[i + 1]
                skip = 0
                while i_mupy + skip < len(lines_mupy) and not line_exp[1].match(lines_mupy[i_mupy + skip]):
                    skip += 1
                if i_mupy + skip >= len(lines_mupy):
                    lines_mupy[i_mupy] = b'######## FAIL\n'
                    break
                del lines_mupy[i_mupy:i_mupy + skip]
                lines_mupy.insert(i_mupy, b'########\n')
                i_mupy += 1
            else:
                # a regex
                if lines_exp[i][1].match(lines_mupy[i_mupy]):
                    lines_mupy[i_mupy] = lines_exp[i][0]
                else:
                    #print("don't match: %r %s" % (lines_exp[i][1], lines_mupy[i_mupy])) # DEBUG
                    pass
                i_mupy += 1
            if i_mupy >= len(lines_mupy):
                break
        output_mupy = b''.join(lines_mupy)

    return output_mupy


def run_feature_check(pyb, args, base_path, test_file):
    return run_micropython(pyb, args, base_path + "/feature_check/" + test_file, is_special=True)

class ThreadSafeCounter:
    def __init__(self, start=0):
        self._value = start
        self._lock = threading.Lock()

    def add(self, to_add):
        with self._lock: self._value += to_add

    def append(self, arg):
        self.add([arg])

    @property
    def value(self):
        return self._value

def run_tests(pyb, tests, args, base_path=".", num_threads=1):
    test_count = ThreadSafeCounter()
    testcase_count = ThreadSafeCounter()
    passed_count = ThreadSafeCounter()
    failed_tests = ThreadSafeCounter([])
    skipped_tests = ThreadSafeCounter([])

    skip_tests = set()
    skip_native = False
    skip_int_big = False
    skip_set_type = False
    skip_async = False
    skip_const = False
    skip_revops = False
    skip_endian = False
    has_complex = True
    has_coverage = False

    upy_float_precision = 32

    # Some tests shouldn't be run under Travis CI
    if os.getenv('TRAVIS') == 'true':
        skip_tests.add('basics/memoryerror.py')
        skip_tests.add('thread/thread_gc1.py') # has reliability issues
        skip_tests.add('thread/thread_lock4.py') # has reliability issues
        skip_tests.add('thread/stress_heap.py') # has reliability issues
        skip_tests.add('thread/stress_recurse.py') # has reliability issues

    if upy_float_precision == 0:
        skip_tests.add('extmod/ujson_dumps_float.py')
        skip_tests.add('extmod/ujson_loads_float.py')
        skip_tests.add('misc/rge_sm.py')
    if upy_float_precision < 32:
        skip_tests.add('float/float2int_intbig.py') # requires fp32, there's float2int_fp30_intbig.py instead
        skip_tests.add('float/string_format.py') # requires fp32, there's string_format_fp30.py instead
        skip_tests.add('float/bytes_construct.py') # requires fp32
        skip_tests.add('float/bytearray_construct.py') # requires fp32
    if upy_float_precision < 64:
        skip_tests.add('float/float_divmod.py') # tested by float/float_divmod_relaxed.py instead
        skip_tests.add('float/float2int_doubleprec_intbig.py')
        skip_tests.add('float/float_parse_doubleprec.py')

    if not has_complex:
        skip_tests.add('float/complex1.py')
        skip_tests.add('float/complex1_intbig.py')
        skip_tests.add('float/int_big_float.py')
        skip_tests.add('float/true_value.py')
        skip_tests.add('float/types.py')

    if not has_coverage:
        skip_tests.add('cmdline/cmd_parsetree.py')

    # Some tests shouldn't be run on a PC
    if args.target == 'unix':
        # unix build does not have the GIL so can't run thread mutation tests
        for t in tests:
            if t.startswith('thread/mutate_'):
                skip_tests.add(t)

    # Some tests shouldn't be run on pyboard
    if args.target != 'unix':
        skip_tests.add('basics/exception_chain.py') # warning is not printed
        skip_tests.add('micropython/meminfo.py') # output is very different to PC output
        skip_tests.add('extmod/machine_mem.py') # raw memory access not supported

        if args.target == 'wipy':
            skip_tests.add('misc/print_exception.py')       # requires error reporting full
            skip_tests.update({'extmod/uctypes_%s.py' % t for t in 'bytearray le native_le ptr_le ptr_native_le sizeof sizeof_native array_assign_le array_assign_native_le'.split()}) # requires uctypes
            skip_tests.add('extmod/zlibd_decompress.py')    # requires zlib
            skip_tests.add('extmod/uheapq1.py')             # uheapq not supported by WiPy
            skip_tests.add('extmod/urandom_basic.py')       # requires urandom
            skip_tests.add('extmod/urandom_extra.py')       # requires urandom
        elif args.target == 'esp8266':
            skip_tests.add('misc/rge_sm.py')                # too large
        elif args.target == 'minimal':
            skip_tests.add('basics/class_inplace_op.py')    # all special methods not supported
            skip_tests.add('basics/subclass_native_init.py')# native subclassing corner cases not support
            skip_tests.add('misc/rge_sm.py')                # too large
            skip_tests.add('micropython/opt_level.py')      # don't assume line numbers are stored

    # Some tests are known to fail on 64-bit machines
    if pyb is None and platform.architecture()[0] == '64bit':
        pass

    # Some tests use unsupported features on Windows
    if os.name == 'nt':
        skip_tests.add('import/import_file.py') # works but CPython prints forward slashes

    # Some tests are known to fail with native emitter
    # Remove them from the below when they work
    if args.emit == 'native':
        skip_tests.update({'basics/%s.py' % t for t in 'gen_yield_from gen_yield_from_close gen_yield_from_ducktype gen_yield_from_exc gen_yield_from_executing gen_yield_from_iter gen_yield_from_send gen_yield_from_stopped gen_yield_from_throw gen_yield_from_throw2 gen_yield_from_throw3 generator1 generator2 generator_args generator_close generator_closure generator_exc generator_pend_throw generator_return generator_send'.split()}) # require yield
        skip_tests.update({'basics/%s.py' % t for t in 'bytes_gen class_store_class globals_del string_join gen_stack_overflow'.split()}) # require yield
        skip_tests.update({'basics/async_%s.py' % t for t in 'def await await2 for for2 with with2 coroutine'.split()}) # require yield
        skip_tests.update({'basics/%s.py' % t for t in 'try_reraise try_reraise2'.split()}) # require raise_varargs
        skip_tests.update({'basics/%s.py' % t for t in 'with_break with_continue with_return'.split()}) # require complete with support
        skip_tests.add('basics/array_construct2.py') # requires generators
        skip_tests.add('basics/bool1.py') # seems to randomly fail
        skip_tests.add('basics/builtin_hash_gen.py') # requires yield
        skip_tests.add('basics/class_bind_self.py') # requires yield
        skip_tests.add('basics/del_deref.py') # requires checking for unbound local
        skip_tests.add('basics/del_local.py') # requires checking for unbound local
        skip_tests.add('basics/exception_chain.py') # raise from is not supported
        skip_tests.add('basics/for_range.py') # requires yield_value
        skip_tests.add('basics/try_finally_loops.py') # requires proper try finally code
        skip_tests.add('basics/try_finally_return.py') # requires proper try finally code
        skip_tests.add('basics/try_finally_return2.py') # requires proper try finally code
        skip_tests.add('basics/unboundlocal.py') # requires checking for unbound local
        skip_tests.add('import/gen_context.py') # requires yield_value
        skip_tests.add('misc/features.py') # requires raise_varargs
        skip_tests.add('misc/rge_sm.py') # requires yield
        skip_tests.add('misc/print_exception.py') # because native doesn't have proper traceback info
        skip_tests.add('misc/sys_exc_info.py') # sys.exc_info() is not supported for native
        skip_tests.add('micropython/emg_exc.py') # because native doesn't have proper traceback info
        skip_tests.add('micropython/heapalloc_traceback.py') # because native doesn't have proper traceback info
        skip_tests.add('micropython/heapalloc_iter.py') # requires generators
        skip_tests.add('micropython/schedule.py') # native code doesn't check pending events
        skip_tests.add('stress/gc_trace.py') # requires yield
        skip_tests.add('stress/recursive_gen.py') # requires yield
        skip_tests.add('extmod/vfs_userfs.py') # because native doesn't properly handle globals across different modules
        skip_tests.add('../extmod/ulab/tests/argminmax.py') # requires yield

    def run_one_test(test_file):
        test_file = test_file.replace('\\', '/')

        if args.filters:
            # Default verdict is the opposit of the first action
            verdict = "include" if args.filters[0][0] == "exclude" else "exclude"
            for action, pat in args.filters:
                if pat.search(test_file):
                    verdict = action
            if verdict == "exclude":
                return

        test_basename = os.path.basename(test_file)
        test_name = os.path.splitext(test_basename)[0]
        is_native = test_name.startswith("native_") or test_name.startswith("viper_")
        is_endian = test_name.endswith("_endian")
        is_int_big = test_name.startswith("int_big") or test_name.endswith("_intbig")
        is_set_type = test_name.startswith("set_") or test_name.startswith("frozenset")
        is_async = test_name.startswith("async_")
        is_const = test_name.startswith("const")

        skip_it = test_file in skip_tests
        skip_it |= skip_native and is_native
        skip_it |= skip_endian and is_endian
        skip_it |= skip_int_big and is_int_big
        skip_it |= skip_set_type and is_set_type
        skip_it |= skip_async and is_async
        skip_it |= skip_const and is_const
        skip_it |= skip_revops and test_name.startswith("class_reverse_op")

        if args.list_tests:
            if not skip_it:
                print(test_file)
            return

        if skip_it:
            print("skip ", test_file)
            skipped_tests.append(test_name)
            return

        # get expected output
        test_file_expected = test_file + '.exp'
        if os.path.isfile(test_file_expected):
            # expected output given by a file, so read that in
            with open(test_file_expected, 'rb') as f:
                output_expected = f.read()
        else:
            if not args.write_exp:
                output_expected = b"NOEXP\n"
            else:
                # run CPython to work out expected output
                e = {"PYTHONPATH": os.getcwd(),
                     "PATH": os.environ["PATH"],
                     "LANG": "en_US.UTF-8"}
                p = subprocess.Popen([MICROPYTHON, test_file], env=e, stdout=subprocess.PIPE)
                output_expected = b''
                while p.poll() is None:
                    output_expected += p.stdout.read()
                output_expected += p.stdout.read()
                with open(test_file_expected, 'wb') as f:
                    f.write(output_expected)

        # canonical form for all host platforms is to use \n for end-of-line
        output_expected = output_expected.replace(b'\r\n', b'\n')

        if args.write_exp:
            return

        # run MicroPython
        output_mupy = run_micropython(pyb, args, test_file)

        if output_mupy == b'SKIP\n':
            print("skip ", test_file)
            skipped_tests.append(test_name)
            return

        if output_expected == b'NOEXP\n':
            print("noexp", test_file)
            failed_tests.append(test_name)
            return

        testcase_count.add(len(output_expected.splitlines()))

        filename_expected = test_basename + ".exp"
        filename_mupy = test_basename + ".out"

        if output_expected == output_mupy:
            print("pass ", test_file)
            passed_count.add(1)
            rm_f(filename_expected)
            rm_f(filename_mupy)
        else:
            with open(filename_expected, "wb") as f:
                f.write(output_expected)
            with open(filename_mupy, "wb") as f:
                f.write(output_mupy)
            print("### Expected")
            print(output_expected)
            print("### Actual")
            print(output_mupy)
            print("FAIL ", test_file)
            failed_tests.append(test_name)

        test_count.add(1)

    if args.list_tests:
        return True

    if num_threads > 1:
        pool = ThreadPool(num_threads)
        pool.map(run_one_test, tests)
    else:
        for test in tests:
            run_one_test(test)

    print("{} tests performed ({} individual testcases)".format(test_count.value, testcase_count.value))
    print("{} tests passed".format(passed_count.value))

    if len(skipped_tests.value) > 0:
        print("{} tests skipped: {}".format(len(skipped_tests.value), ' '.join(sorted(skipped_tests.value))))
    if len(failed_tests.value) > 0:
        print("{} tests failed: {}".format(len(failed_tests.value), ' '.join(sorted(failed_tests.value))))
        return False

    # all tests succeeded
    return True


class append_filter(argparse.Action):

    def __init__(self, option_strings, dest, **kwargs):
        super().__init__(option_strings, dest, default=[], **kwargs)

    def __call__(self, parser, args, value, option):
        if not hasattr(args, self.dest):
            args.filters = []
        if option.startswith(("-e", "--e")):
            option = "exclude"
        else:
            option = "include"
        args.filters.append((option, re.compile(value)))


def main():
    cmd_parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description='Run and manage tests for MicroPython.',
        epilog='''\
Options -i and -e can be multiple and processed in the order given. Regex
"search" (vs "match") operation is used. An action (include/exclude) of
the last matching regex is used:
  run-tests -i async - exclude all, then include tests containg "async" anywhere
  run-tests -e '/big.+int' - include all, then exclude by regex
  run-tests -e async -i async_foo - include all, exclude async, yet still include async_foo
''')
    cmd_parser.add_argument('--target', default='unix', help='the target platform')
    cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
    cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
    cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
    cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
    cmd_parser.add_argument('-d', '--test-dirs', nargs='*', help='input test directories (if no files given)')
    cmd_parser.add_argument('-e', '--exclude', action=append_filter, metavar='REGEX', dest='filters', help='exclude test by regex on path/name.py')
    cmd_parser.add_argument('-i', '--include', action=append_filter, metavar='REGEX', dest='filters', help='include test by regex on path/name.py')
    cmd_parser.add_argument('--write-exp', action='store_true', help='save .exp files to run tests w/o CPython')
    cmd_parser.add_argument('--list-tests', action='store_true', help='list tests instead of running them')
    cmd_parser.add_argument('--emit', default='bytecode', help='MicroPython emitter to use (bytecode or native)')
    cmd_parser.add_argument('--heapsize', help='heapsize to use (use default if not specified)')
    cmd_parser.add_argument('--via-mpy', action='store_true', help='compile .py files to .mpy first')
    cmd_parser.add_argument('--keep-path', action='store_true', help='do not clear MICROPYPATH when running tests')
    cmd_parser.add_argument('-j', '--jobs', default=1, metavar='N', type=int, help='Number of tests to run simultaneously')
    cmd_parser.add_argument('--auto-jobs', action='store_const', dest='jobs', const=multiprocessing.cpu_count(), help='Set the -j values to the CPU (thread) count')
    cmd_parser.add_argument('files', nargs='*', help='input test files')
    args = cmd_parser.parse_args()

    EXTERNAL_TARGETS = ('pyboard', 'wipy', 'esp8266', 'esp32', 'minimal')
    if args.target == 'unix' or args.list_tests:
        pyb = None
    elif args.target in EXTERNAL_TARGETS:
        import pyboard
        pyb = pyboard.Pyboard(args.device, args.baudrate, args.user, args.password)
        pyb.enter_raw_repl()
    else:
        raise ValueError('target must be either %s or unix' % ", ".join(EXTERNAL_TARGETS))

    if len(args.files) == 0:
        if args.test_dirs is None:
            if args.target == 'pyboard':
                # run pyboard tests
                test_dirs = ('basics', 'micropython', 'float', 'misc', 'stress', 'extmod', 'pyb', 'pybnative', 'inlineasm')
            elif args.target in ('esp8266', 'esp32', 'minimal'):
                test_dirs = ('basics', 'micropython', 'float', 'misc', 'extmod')
            elif args.target == 'wipy':
                # run WiPy tests
                test_dirs = ('basics', 'micropython', 'misc', 'extmod', 'wipy')
            else:
                # run PC tests
                test_dirs = (
                    'basics', 'micropython', 'float', 'import', 'io', 'misc',
                    'stress', 'unicode', 'extmod', '../extmod/ulab/tests', 'unix', 'cmdline',
                )
        else:
            # run tests from these directories
            test_dirs = args.test_dirs
        tests = sorted(test_file for test_files in (glob('{}/*.py'.format(dir)) for dir in test_dirs) for test_file in test_files)
    else:
        # tests explicitly given
        tests = args.files

    if not args.keep_path:
        # clear search path to make sure tests use only builtin modules
        os.environ['MICROPYPATH'] = ''

    # Even if we run completely different tests in a different directory,
    # we need to access feature_check's from the same directory as the
    # run-tests script itself.
    base_path = os.path.dirname(sys.argv[0]) or "."
    try:
        res = run_tests(pyb, tests, args, base_path, args.jobs)
    finally:
        if pyb:
            pyb.close()

    if not res:
        sys.exit(1)

if __name__ == "__main__":
    main()
